use crate::checks::Checks;
use crate::types::ServiceStatus;
use futures::future;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::RwLock;
use tokio::sync::broadcast;

#[cfg(unix)]
use nix::unistd::Pid;

pub struct ServiceManager {
    processes: Arc<RwLock<HashMap<String, Option<Child>>>>,
    status: Arc<RwLock<HashMap<String, ServiceStatus>>>,
    watching: Arc<RwLock<HashMap<String, bool>>>,
    quitting: Arc<RwLock<bool>>,
    log_sender: broadcast::Sender<(String, String, u64, String)>,
    status_change_sender: broadcast::Sender<()>, // Notifies when status changes
    base_dir: std::path::PathBuf,
}

impl ServiceManager {
    pub fn new(
        base_dir: std::path::PathBuf,
        log_sender: broadcast::Sender<(String, String, u64, String)>,
        saved_watching: HashMap<String, bool>,
    ) -> Self {
        let mut status = HashMap::new();
        let mut watching = HashMap::new();

        for service in crate::services::get_services() {
            status.insert(service.name.clone(), ServiceStatus::Stopped);
            // Use saved state if available, otherwise default to true (all services enabled)
            let should_watch = saved_watching.get(&service.name).copied().unwrap_or(true);
            watching.insert(service.name.clone(), should_watch);
        }

        let (status_change_sender, _) = broadcast::channel(100);

        Self {
            processes: Arc::new(RwLock::new(HashMap::new())),
            status: Arc::new(RwLock::new(status)),
            watching: Arc::new(RwLock::new(watching)),
            quitting: Arc::new(RwLock::new(false)),
            log_sender,
            status_change_sender,
            base_dir,
        }
    }

    pub fn get_status(&self) -> Arc<RwLock<HashMap<String, ServiceStatus>>> {
        self.status.clone()
    }

    pub fn get_watching(&self) -> Arc<RwLock<HashMap<String, bool>>> {
        self.watching.clone()
    }

    pub fn get_status_change_sender(&self) -> broadcast::Sender<()> {
        self.status_change_sender.clone()
    }

    pub async fn get_status_map(&self) -> HashMap<String, ServiceStatus> {
        self.status.read().await.clone()
    }

    pub async fn get_watching_map(&self) -> HashMap<String, bool> {
        self.watching.read().await.clone()
    }

    pub async fn start_all_services(&self) {
        // Set all services to Starting (pending) unless they're Killed
        {
            let mut status = self.status.write().await;
            for service in crate::services::get_services() {
                let current_status = status
                    .get(&service.name)
                    .cloned()
                    .unwrap_or(ServiceStatus::Stopped);
                if !matches!(current_status, ServiceStatus::Killed) {
                    status.insert(service.name.clone(), ServiceStatus::Starting);
                }
            }
        }
        self.notify_status_change();

        // Start all services by default (in logical order: dependencies first)
        // Start types first (needed for core)
        self.start_service("types").await;

        // Start core (needed for client)
        self.start_service("core").await;

        // Start shared (needed for api) and wait for it to complete
        self.start_service("shared").await;

        // Wait for shared to complete before starting API
        // This matches node dev behavior where shared -> db -> api
        self.wait_for_service_success("shared").await;

        // Run database migrations before starting API (matches node dev behavior)
        self.run_db_migrations().await;

        // Start client (can run in parallel with api)
        self.start_service("client").await;

        // Start python
        self.start_service("python").await;

        // Start API (depends on shared and migrations)
        self.start_service("api").await;

        // Wait for API to be ready before starting dependent services
        self.wait_for_service_success("api").await;

        // Start Rust services (depend on API)
        self.start_service("multiplayer").await;
        self.start_service("files").await;
        self.start_service("connection").await;
    }

    /// Wait for a service to reach Running status (or timeout after 60 seconds)
    async fn wait_for_service_success(&self, name: &str) {
        let start = std::time::Instant::now();
        let timeout = std::time::Duration::from_secs(60);

        loop {
            {
                let status = self.status.read().await;
                match status.get(name) {
                    Some(ServiceStatus::Running) => {
                        self.log(name, format!("{} is ready", name)).await;
                        return;
                    }
                    Some(ServiceStatus::Error) => {
                        self.log(name, format!("{} failed, continuing anyway", name))
                            .await;
                        return;
                    }
                    Some(ServiceStatus::Killed) => {
                        self.log(name, format!("{} was killed, skipping wait", name))
                            .await;
                        return;
                    }
                    _ => {}
                }
            }

            if start.elapsed() > timeout {
                self.log(
                    name,
                    format!("{} did not become ready within timeout, continuing", name),
                )
                .await;
                return;
            }

            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        }
    }

    /// Run database migrations (matches node dev runDb behavior)
    async fn run_db_migrations(&self) {
        self.log("db", "Running database migrations...".to_string())
            .await;

        // Update a pseudo-status for db (we don't have a separate db service)
        // Just log the migration status

        let output = Command::new("npm")
            .args(["run", "prisma:migrate", "--workspace=quadratic-api"])
            .current_dir(&self.base_dir)
            .output()
            .await;

        match output {
            Ok(o) => {
                let stdout = String::from_utf8_lossy(&o.stdout);
                let stderr = String::from_utf8_lossy(&o.stderr);

                // Log the output
                for line in stdout.lines() {
                    if !line.trim().is_empty() {
                        self.log("db", line.to_string()).await;
                    }
                }
                for line in stderr.lines() {
                    if !line.trim().is_empty() {
                        self.log("db", line.to_string()).await;
                    }
                }

                if o.status.success() {
                    self.log(
                        "db",
                        "Database migrations completed successfully".to_string(),
                    )
                    .await;
                } else {
                    self.log(
                        "db",
                        "Database migrations failed, API may not work correctly".to_string(),
                    )
                    .await;
                }
            }
            Err(e) => {
                self.log("db", format!("Failed to run migrations: {}", e))
                    .await;
            }
        }
    }

    pub async fn start_service(&self, name: &str) {
        self.start_service_with_perf(name, false).await;
    }

    pub async fn start_service_with_perf(&self, name: &str, perf: bool) {
        if *self.quitting.read().await {
            return;
        }

        // Don't start services that are killed - they must be manually restarted
        {
            let status = self.status.read().await;
            if matches!(status.get(name), Some(ServiceStatus::Killed)) {
                return;
            }
        }

        // Handle checks service specially - run checks and exit
        if name == "checks" {
            let checks = Checks::new(
                self.log_sender.clone(),
                self.status.clone(),
                self.status_change_sender.clone(),
                self.base_dir.clone(),
            );
            checks.run().await;
            return;
        }

        let service = crate::services::get_service_by_name(name);
        let Some(service) = service else {
            return;
        };

        // Check if there's an existing process before killing
        let had_existing_process = {
            let processes = self.processes.read().await;
            processes.get(name).and_then(|p| p.as_ref()).is_some()
        };

        // Only kill if there's an existing process
        if had_existing_process {
            self.kill_service(name).await;
        }

        // For services with ports, check if port is in use and kill if needed
        if let Some(port) = service.port() {
            // Only kill processes on port if we had an existing process or port is actually in use
            if had_existing_process || !self.is_port_free(port).await {
                // Kill any processes still using the port
                self.kill_processes_on_port(port).await;

                // Give the OS a moment to release the port after killing processes
                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

                // Wait up to 2 seconds for the port to be released
                if !self.wait_for_port_free(port, 2000).await {
                    self.log(
                        name,
                        format!(
                            "Warning: Port {} may still be in use, proceeding anyway",
                            port
                        ),
                    )
                    .await;
                } else {
                    self.log(name, format!("Port {} is now free", port)).await;
                }
            }
        }

        let watching = *self.watching.read().await.get(name).unwrap_or(&false);
        let base_dir = self.base_dir.clone();

        // Log the command being run for debugging (especially for shared)
        if name == "shared" {
            let service_config = service.config();
            let command = if watching {
                service_config.watch_command.as_ref()
            } else {
                Some(&service_config.command)
            };
            if let Some(cmd) = command {
                self.log(name, format!("Running command: {}", cmd.join(" ")))
                    .await;
            }
        }

        let mut cmd = service.build_command(watching, perf, &base_dir);

        let mut child = match cmd.spawn() {
            Ok(c) => c,
            Err(e) => {
                self.log(name, format!("Failed to start: {}", e)).await;
                let mut status = self.status.write().await;
                status.insert(name.to_string(), ServiceStatus::Error);
                drop(status);
                self.notify_status_change();
                return;
            }
        };

        // Log that we've started the process (for debugging)
        self.log(name, "Process spawned successfully".to_string())
            .await;

        // Update status
        {
            let mut status = self.status.write().await;
            status.insert(name.to_string(), ServiceStatus::Starting);
        }
        self.notify_status_change();

        self.log(name, "Starting...".to_string()).await;

        // Handle stdout
        let stdout = child.stdout.take();
        let stdout_handle = if let Some(stdout) = stdout {
            let name = name.to_string();
            let sender = self.log_sender.clone();
            let status = self.status.clone();
            let service_config = service.config();
            let status_notifier = self.status_change_sender.clone();

            Some(tokio::spawn(async move {
                let reader = BufReader::new(stdout);
                let mut lines = reader.lines();
                loop {
                    match lines.next_line().await {
                        Ok(Some(line)) => {
                            let timestamp = SystemTime::now()
                                .duration_since(UNIX_EPOCH)
                                .unwrap_or_default()
                                .as_secs();
                            let _ = sender.send((
                                name.clone(),
                                line.clone(),
                                timestamp,
                                "stdout".to_string(),
                            ));

                            // Check for success/error patterns using service methods
                            // Don't update status if service is killed
                            let mut status_guard = status.write().await;
                            let current_status = status_guard
                                .get(&name)
                                .cloned()
                                .unwrap_or(ServiceStatus::Stopped);

                            // Don't change status if service is killed
                            if matches!(current_status, ServiceStatus::Killed) {
                                continue;
                            }

                            // Transition to appropriate state whenever pattern is detected, regardless of current state
                            let mut status_changed = false;
                            if service_config
                                .success_patterns
                                .iter()
                                .any(|p| line.contains(p))
                            {
                                status_guard.insert(name.clone(), ServiceStatus::Running);
                                status_changed = true;
                            } else if service_config
                                .error_patterns
                                .iter()
                                .any(|p| line.contains(p))
                            {
                                status_guard.insert(name.clone(), ServiceStatus::Error);
                                status_changed = true;
                            } else if service_config
                                .start_patterns
                                .iter()
                                .any(|p| line.contains(p))
                            {
                                status_guard.insert(name.clone(), ServiceStatus::Starting);
                                status_changed = true;
                            }
                            drop(status_guard);

                            if status_changed {
                                let _ = status_notifier.send(());
                            }
                        }
                        Ok(None) => {
                            break;
                        }
                        Err(e) => {
                            eprintln!("Error reading stdout for {}: {}", name, e);
                            break;
                        }
                    }
                }
            }))
        } else {
            None
        };

        // Handle stderr
        let stderr = child.stderr.take();
        let stderr_handle = if let Some(stderr) = stderr {
            if name == "shared" {
                self.log(name, "Setting up stderr reader".to_string()).await;
            }
            let name = name.to_string();
            let sender = self.log_sender.clone();
            let status = self.status.clone();
            let service_config = service.config();
            let status_notifier = self.status_change_sender.clone();

            // For cargo-based services (like core), stderr contains normal informational output
            // We'll check if it's an actual error, otherwise treat as stdout
            let is_cargo_service =
                name == "core" || name == "multiplayer" || name == "files" || name == "connection";
            let error_patterns = service_config.error_patterns.clone();

            Some(tokio::spawn(async move {
                let reader = BufReader::new(stderr);
                let mut lines = reader.lines();
                loop {
                    match lines.next_line().await {
                        Ok(Some(line)) => {
                            let timestamp = SystemTime::now()
                                .duration_since(UNIX_EPOCH)
                                .unwrap_or_default()
                                .as_secs();

                            // Check if this line matches error patterns
                            let is_error = error_patterns.iter().any(|p| line.contains(p));

                            // For cargo-based services: if it's an error, mark as stderr; otherwise as stdout
                            // For other services: always mark as stderr
                            let stream = if is_cargo_service {
                                if is_error { "stderr" } else { "stdout" }
                            } else {
                                "stderr"
                            };

                            let _ = sender.send((
                                name.clone(),
                                line.clone(),
                                timestamp,
                                stream.to_string(),
                            ));

                            // Check for success/error/start patterns to update status
                            // Don't update status if service is killed
                            let mut status_guard = status.write().await;
                            let current_status = status_guard
                                .get(&name)
                                .cloned()
                                .unwrap_or(ServiceStatus::Stopped);

                            // Don't change status if service is killed
                            if matches!(current_status, ServiceStatus::Killed) {
                                continue;
                            }

                            // Check patterns in order: success, error, start
                            let mut status_changed = false;
                            if service_config
                                .success_patterns
                                .iter()
                                .any(|p| line.contains(p))
                            {
                                status_guard.insert(name.clone(), ServiceStatus::Running);
                                status_changed = true;
                            } else if is_error {
                                status_guard.insert(name.clone(), ServiceStatus::Error);
                                status_changed = true;
                            } else if service_config
                                .start_patterns
                                .iter()
                                .any(|p| line.contains(p))
                            {
                                status_guard.insert(name.clone(), ServiceStatus::Starting);
                                status_changed = true;
                            }
                            drop(status_guard);

                            if status_changed {
                                let _ = status_notifier.send(());
                            }
                        }
                        Ok(None) => {
                            break;
                        }
                        Err(e) => {
                            eprintln!("Error reading stderr for {}: {}", name, e);
                            break;
                        }
                    }
                }
            }))
        } else {
            None
        };

        // Store the process - the child's PID is its PGID due to pre_exec setpgid(0,0)
        {
            let mut processes = self.processes.write().await;
            processes.insert(name.to_string(), Some(child));
        }

        // For one-time commands (like shared compile or types), wait for reading tasks to finish
        // This ensures all output is captured even if the process exits quickly
        let is_one_time_command = (name == "shared" && !watching) || name == "types";
        if is_one_time_command {
            if let Some(handle) = stdout_handle {
                let _ = handle.await;
            }
            if let Some(handle) = stderr_handle {
                let _ = handle.await;
            }
        }
    }

    pub async fn kill_service(&self, name: &str) {
        // Get the service's port and cwd before killing (for fallback)
        let service_port = crate::services::get_service_by_name(name).and_then(|s| s.port());
        let service_cwd = crate::services::get_service_by_name(name).and_then(|s| s.cwd());

        // Kill the spawned process and its entire process group
        let mut processes = self.processes.write().await;
        if let Some(Some(mut child)) = processes.remove(name) {
            let pid = child.id();
            drop(processes); // Release the lock before async operations

            // For services with ports, kill processes on that port first (catches child processes)
            if let Some(port) = service_port {
                self.kill_processes_on_port(port).await;
            }

            // For cargo-based services, also kill cargo and rustc processes in the service directory
            if let Some(cwd) = service_cwd {
                self.kill_cargo_processes(&cwd).await;
            }

            // Check if the process already exited to avoid killing unrelated reused PIDs
            match child.try_wait() {
                Ok(Some(status)) => {
                    self.log(
                        name,
                        format!("Process already exited with status: {:?}", status.code()),
                    )
                    .await;
                    return;
                }
                Ok(None) => {
                    // Still running, continue with kill logic
                }
                Err(e) => {
                    self.log(
                        name,
                        format!("Failed to check process state before killing: {}", e),
                    )
                    .await;
                }
            }

            let mut kill_succeeded = false;

            #[cfg(unix)]
            {
                // The child's PID is its PGID due to unsafe_pre_exec setpgid(0,0)
                // Kill the entire process group using kill with negative PID
                if let Some(pid) = pid {
                    // Safety check: don't kill our own process
                    let current_pid = nix::unistd::getpid();
                    if pid as i32 == current_pid.as_raw() {
                        eprintln!(
                            "ERROR: Attempted to kill process {} which is dev-rust itself! Aborting for {}.",
                            pid, name
                        );
                        return;
                    }

                    // Kill the entire process group (PGID = PID due to setpgid in unsafe_pre_exec)
                    // Using negative PID tells kill() to kill the process group
                    match nix::sys::signal::kill(
                        Pid::from_raw(-(pid as i32)),
                        nix::sys::signal::Signal::SIGKILL,
                    ) {
                        Ok(()) => {
                            kill_succeeded = true;
                        }
                        Err(e) => {
                            self.log(
                                name,
                                format!("Warning: Failed to kill process group: {}", e),
                            )
                            .await;
                            kill_succeeded = false;
                        }
                    }
                }
            }
            #[cfg(not(unix))]
            {
                // On Windows, just kill the process
                kill_succeeded = child.kill().is_ok();
            }

            // Wait for the process to fully exit with a timeout
            let wait_result =
                tokio::time::timeout(tokio::time::Duration::from_secs(5), child.wait()).await;

            match wait_result {
                Ok(Ok(status)) => {
                    self.log(
                        name,
                        format!("Process exited with status: {:?}", status.code()),
                    )
                    .await;
                }
                Ok(Err(e)) => {
                    self.log(name, format!("Error waiting for process: {}", e))
                        .await;
                }
                Err(_) => {
                    self.log(
                        name,
                        "Process did not exit within timeout, but continuing anyway".to_string(),
                    )
                    .await;
                }
            }

            // Verify the process is actually gone (Unix only)
            #[cfg(unix)]
            {
                if let Some(pid) = pid {
                    // Check if the process still exists
                    let check_result = nix::sys::signal::kill(
                        Pid::from_raw(pid as i32),
                        nix::sys::signal::Signal::SIGCONT, // Use SIGCONT (harmless) to check if process exists
                    );
                    if check_result.is_ok() {
                        // Process still exists, try killing again
                        self.log(
                            name,
                            format!("Process {} still exists, sending SIGKILL again", pid),
                        )
                        .await;
                        match nix::sys::signal::kill(
                            Pid::from_raw(-(pid as i32)),
                            nix::sys::signal::Signal::SIGKILL,
                        ) {
                            Ok(()) => {
                                kill_succeeded = true;
                            }
                            Err(_) => {
                                kill_succeeded = false;
                            }
                        }
                        // Wait a bit more
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    } else {
                        // Process is gone, kill succeeded
                        kill_succeeded = true;
                    }
                }
            }

            // If we failed to kill the process and the service has a port, kill all processes on that port
            if !kill_succeeded {
                if let Some(port) = service_port {
                    self.log(
                        name,
                        format!(
                            "Failed to kill process, killing all processes on port {}",
                            port
                        ),
                    )
                    .await;
                    self.kill_processes_on_port(port).await;
                } else {
                    self.log(
                        name,
                        "Failed to kill process and service has no port to fall back to"
                            .to_string(),
                    )
                    .await;
                }
            }
        }
    }

    async fn is_port_free(&self, port: u16) -> bool {
        #[cfg(unix)]
        {
            // Check if any process is using the port
            let output = Command::new("lsof")
                .args(["-ti", &format!("tcp:{}", port)])
                .output()
                .await;

            if let Ok(output) = output {
                output.stdout.is_empty()
            } else {
                // If lsof fails, assume port might be in use (conservative)
                false
            }
        }
        #[cfg(not(unix))]
        {
            // On Windows, try to bind to the port to check if it's free
            use std::net::TcpListener;
            TcpListener::bind(format!("127.0.0.1:{}", port)).is_ok()
        }
    }

    async fn wait_for_port_free(&self, port: u16, timeout_ms: u64) -> bool {
        let start = std::time::Instant::now();
        let timeout = std::time::Duration::from_millis(timeout_ms);

        while start.elapsed() < timeout {
            if self.is_port_free(port).await {
                return true;
            }
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
        }
        false
    }

    async fn kill_cargo_processes(&self, cwd: &str) {
        #[cfg(unix)]
        {
            // Extract the package name from cwd (e.g., "quadratic-files" from "quadratic-files")
            let package_name = cwd.split('/').next_back().unwrap_or(cwd);
            // Convert to crate name format (replace - with _)
            let crate_name = package_name.replace('-', "_");

            // Find cargo processes related to this specific package
            // Match patterns like "cargo.*quadratic-files" or "cargo.*-p quadratic-files"
            let pattern = format!("cargo.*{}", package_name);
            let output = Command::new("pgrep").args(["-f", &pattern]).output().await;

            if let Ok(output) = output {
                let pids_str = String::from_utf8_lossy(&output.stdout);
                let current_pid = nix::unistd::getpid().as_raw();

                for pid_str in pids_str.trim().split('\n').filter(|s| !s.is_empty()) {
                    if let Ok(pid) = pid_str.trim().parse::<i32>() {
                        if pid == current_pid {
                            continue;
                        }

                        // Kill the process and its process group
                        // Try process group first (more thorough)
                        let _ = nix::sys::signal::kill(
                            Pid::from_raw(-pid), // Negative PID kills process group
                            nix::sys::signal::Signal::SIGKILL,
                        );
                        // Also kill the process directly as fallback
                        let _ = nix::sys::signal::kill(
                            Pid::from_raw(pid),
                            nix::sys::signal::Signal::SIGKILL,
                        );
                    }
                }
            }

            // Kill rustc processes that are compiling this specific crate
            // rustc uses crate names (underscores) like "--crate-name quadratic_files"
            let rustc_pattern = format!("--crate-name {}", crate_name);
            let rustc_output = Command::new("pgrep")
                .args(["-f", &rustc_pattern])
                .output()
                .await;

            if let Ok(rustc_output) = rustc_output {
                let pids_str = String::from_utf8_lossy(&rustc_output.stdout);
                let current_pid = nix::unistd::getpid().as_raw();

                for pid_str in pids_str.trim().split('\n').filter(|s| !s.is_empty()) {
                    if let Ok(pid) = pid_str.trim().parse::<i32>() {
                        if pid == current_pid {
                            continue;
                        }
                        // Kill rustc processes related to this crate
                        let _ = nix::sys::signal::kill(
                            Pid::from_raw(pid),
                            nix::sys::signal::Signal::SIGKILL,
                        );
                    }
                }
            }
        }
        #[cfg(not(unix))]
        {
            // On Windows, cargo processes are handled differently
            // The port-based killing should be sufficient
        }
    }

    async fn kill_processes_on_port(&self, port: u16) {
        #[cfg(unix)]
        {
            // Use lsof to find all processes using the port
            let output = Command::new("lsof")
                .args(["-ti", &format!("tcp:{}", port)])
                .output()
                .await;

            if let Ok(output) = output {
                let pids_str = String::from_utf8_lossy(&output.stdout);
                let pids: Vec<&str> = pids_str
                    .trim()
                    .split('\n')
                    .filter(|s| !s.is_empty())
                    .collect();

                if pids.is_empty() {
                    self.log("system", format!("No processes found on port {}", port))
                        .await;
                    return;
                }

                // Safety check: don't kill our own process
                let current_pid = nix::unistd::getpid().as_raw();

                for pid_str in pids {
                    if let Ok(pid) = pid_str.trim().parse::<i32>() {
                        // Don't kill our own process
                        if pid == current_pid {
                            self.log(
                                "system",
                                format!("Skipping our own process {} on port {}", pid, port),
                            )
                            .await;
                            continue;
                        }

                        // Kill the process
                        match nix::sys::signal::kill(
                            Pid::from_raw(pid),
                            nix::sys::signal::Signal::SIGKILL,
                        ) {
                            Ok(()) => {
                                self.log(
                                    "system",
                                    format!("Killed process {} on port {}", pid, port),
                                )
                                .await;
                            }
                            Err(e) => {
                                self.log(
                                    "system",
                                    format!(
                                        "Failed to kill process {} on port {}: {}",
                                        pid, port, e
                                    ),
                                )
                                .await;
                            }
                        }
                    }
                }
            } else {
                self.log(
                    "system",
                    format!(
                        "Failed to find processes on port {}: lsof command failed",
                        port
                    ),
                )
                .await;
            }
        }
        #[cfg(not(unix))]
        {
            // On Windows, use netstat and taskkill
            // Find processes using the port
            let output = Command::new("netstat").args(["-ano"]).output().await;

            if let Ok(output) = output {
                let output_str = String::from_utf8_lossy(&output.stdout);
                let port_str = format!(":{}", port);

                // Parse netstat output to find PIDs
                let mut pids = Vec::new();
                for line in output_str.lines() {
                    if line.contains(&port_str) && line.contains("LISTENING") {
                        // Extract PID from the last column
                        let parts: Vec<&str> = line.split_whitespace().collect();
                        if let Some(pid_str) = parts.last() {
                            if let Ok(pid) = pid_str.parse::<u32>() {
                                pids.push(pid);
                            }
                        }
                    }
                }

                if pids.is_empty() {
                    self.log("system", format!("No processes found on port {}", port))
                        .await;
                    return;
                }

                // Kill each process
                for pid in pids {
                    let output = Command::new("taskkill")
                        .args(["/F", "/PID", &pid.to_string()])
                        .output()
                        .await;

                    match output {
                        Ok(output) if output.status.success() => {
                            self.log("system", format!("Killed process {} on port {}", pid, port))
                                .await;
                        }
                        Ok(_) => {
                            self.log(
                                "system",
                                format!("Failed to kill process {} on port {}", pid, port),
                            )
                            .await;
                        }
                        Err(e) => {
                            self.log(
                                "system",
                                format!("Error killing process {} on port {}: {}", pid, port, e),
                            )
                            .await;
                        }
                    }
                }
            } else {
                self.log(
                    "system",
                    format!(
                        "Failed to find processes on port {}: netstat command failed",
                        port
                    ),
                )
                .await;
            }
        }
    }

    pub async fn restart_all_services(&self) {
        // Reset quitting flag to allow services to start
        {
            let mut quitting = self.quitting.write().await;
            *quitting = false;
        }

        self.log("system", "Restarting all services...".to_string())
            .await;

        // First, kill all services in parallel (fast)
        #[cfg(unix)]
        let mut pids_and_ports: Vec<(String, Option<u32>, Option<u16>)> = Vec::new();
        #[cfg(not(unix))]
        let mut processes_to_kill: Vec<(String, Child, Option<u16>)> = Vec::new();

        {
            let mut processes = self.processes.write().await;
            let status = self.status.read().await;

            for name in status.keys() {
                #[cfg(unix)]
                {
                    if let Some(Some(child)) = processes.remove(name) {
                        let pid = child.id();
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        pids_and_ports.push((name.clone(), pid, port));
                    }
                }
                #[cfg(not(unix))]
                {
                    if let Some(Some(child)) = processes.remove(name) {
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        processes_to_kill.push((name.clone(), child, port));
                    }
                }
            }
        }

        // Kill all processes in parallel
        let mut kill_futures = Vec::new();

        #[cfg(unix)]
        {
            for (_name, pid, port) in pids_and_ports {
                kill_futures.push(async move {
                    if let Some(pid) = pid {
                        let current_pid = nix::unistd::getpid();
                        if pid as i32 != current_pid.as_raw() {
                            let _ = nix::sys::signal::kill(
                                Pid::from_raw(-(pid as i32)),
                                nix::sys::signal::Signal::SIGKILL,
                            );
                        }
                    }

                    if let Some(port) = port
                        && let Ok(output) = Command::new("lsof")
                            .args(["-ti", &format!("tcp:{}", port)])
                            .output()
                            .await
                    {
                        let pids_str = String::from_utf8_lossy(&output.stdout);
                        let current_pid = nix::unistd::getpid().as_raw();
                        for pid_str in pids_str.trim().split('\n') {
                            if let Ok(pid) = pid_str.trim().parse::<i32>()
                                && pid != current_pid
                            {
                                let _ = nix::sys::signal::kill(
                                    Pid::from_raw(pid),
                                    nix::sys::signal::Signal::SIGKILL,
                                );
                            }
                        }
                    }
                });
            }
        }

        #[cfg(not(unix))]
        {
            for (_name, mut child, _port) in processes_to_kill {
                kill_futures.push(async move {
                    let _ = child.kill();
                });
            }
        }

        // Execute all kills in parallel with a timeout
        tokio::time::timeout(
            tokio::time::Duration::from_millis(500),
            future::join_all(kill_futures),
        )
        .await
        .ok();

        // Reset all service statuses to Stopped
        {
            let mut status = self.status.write().await;
            for name in status.keys().cloned().collect::<Vec<_>>() {
                status.insert(name, ServiceStatus::Stopped);
            }
        }
        self.notify_status_change();

        // Small delay to ensure cleanup completes
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        // Now start all services in the proper dependency order
        self.start_all_services().await;

        self.log("system", "All services restarted.".to_string())
            .await;
    }

    pub async fn stop_all_services(&self) {
        self.log("system", "Stopping all services...".to_string())
            .await;

        // Collect all processes to kill - similar to kill_all_services but without setting quitting flag
        #[cfg(not(unix))]
        let mut processes_to_kill: Vec<(String, Child, Option<u16>)> = Vec::new();
        #[cfg(unix)]
        let mut pids_and_ports: Vec<(String, Option<u32>, Option<u16>)> = Vec::new();

        {
            let mut processes = self.processes.write().await;
            let status = self.status.read().await;

            for name in status.keys() {
                #[cfg(unix)]
                {
                    if let Some(Some(child)) = processes.remove(name) {
                        let pid = child.id();
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        pids_and_ports.push((name.clone(), pid, port));
                    }
                }
                #[cfg(not(unix))]
                {
                    if let Some(Some(child)) = processes.remove(name) {
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        processes_to_kill.push((name.clone(), child, port));
                    }
                }
            }
        }

        // Kill all processes in parallel
        let mut kill_futures = Vec::new();

        #[cfg(unix)]
        {
            for (_name, pid, port) in pids_and_ports {
                kill_futures.push(async move {
                    if let Some(pid) = pid {
                        // Safety check: don't kill our own process
                        let current_pid = nix::unistd::getpid();
                        if pid as i32 == current_pid.as_raw() {
                            return;
                        }

                        // Kill the entire process group (PGID = PID due to setpgid)
                        let _ = nix::sys::signal::kill(
                            Pid::from_raw(-(pid as i32)),
                            nix::sys::signal::Signal::SIGKILL,
                        );
                    }

                    // Also kill any processes on the port (in case of lingering processes)
                    if let Some(port) = port
                        && let Ok(output) = Command::new("lsof")
                            .args(["-ti", &format!("tcp:{}", port)])
                            .output()
                            .await
                    {
                        let pids_str = String::from_utf8_lossy(&output.stdout);
                        let current_pid = nix::unistd::getpid().as_raw();
                        for pid_str in pids_str.trim().split('\n') {
                            if let Ok(pid) = pid_str.trim().parse::<i32>()
                                && pid != current_pid
                            {
                                let _ = nix::sys::signal::kill(
                                    Pid::from_raw(pid),
                                    nix::sys::signal::Signal::SIGKILL,
                                );
                            }
                        }
                    }
                });
            }
        }

        #[cfg(not(unix))]
        {
            for (_name, mut child, _port) in processes_to_kill {
                kill_futures.push(async move {
                    let _ = child.kill();
                });
            }
        }

        // Execute all kills in parallel with a timeout
        tokio::time::timeout(
            tokio::time::Duration::from_millis(500),
            future::join_all(kill_futures),
        )
        .await
        .ok();

        // Mark all services as Killed (so they can be restarted)
        {
            let mut status = self.status.write().await;
            for name in status.keys().cloned().collect::<Vec<_>>() {
                status.insert(name, ServiceStatus::Killed);
            }
        }
        self.notify_status_change();

        self.log("system", "All services stopped.".to_string())
            .await;
    }

    pub async fn kill_all_services(&self) {
        // Set quitting flag to prevent new services from starting
        {
            let mut quitting = self.quitting.write().await;
            *quitting = true;
        }

        self.log("system", "Shutting down all services...".to_string())
            .await;

        // Collect all processes to kill - we need to keep Child handles for Windows
        #[cfg(not(unix))]
        let mut processes_to_kill: Vec<(String, Child, Option<u16>)> = Vec::new();
        #[cfg(unix)]
        let mut pids_and_ports: Vec<(String, Option<u32>, Option<u16>)> = Vec::new();

        {
            let mut processes = self.processes.write().await;
            let status = self.status.read().await;

            for name in status.keys() {
                #[cfg(unix)]
                {
                    if let Some(Some(child)) = processes.remove(name) {
                        let pid = child.id();
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        // On Unix, we can kill by PID, so we don't need the Child handle
                        pids_and_ports.push((name.clone(), pid, port));
                    }
                }
                #[cfg(not(unix))]
                {
                    if let Some(Some(mut child)) = processes.remove(name) {
                        let port =
                            crate::services::get_service_by_name(name).and_then(|s| s.port());
                        // On Windows, we need the Child handle to kill
                        processes_to_kill.push((name.clone(), child, port));
                    }
                }
            }
        }

        // Kill all processes in parallel
        let mut kill_futures = Vec::new();

        #[cfg(unix)]
        {
            // On Unix, kill by PID (fast, no need to wait for Child)
            for (_name, pid, port) in pids_and_ports {
                kill_futures.push(async move {
                    if let Some(pid) = pid {
                        // Safety check: don't kill our own process
                        let current_pid = nix::unistd::getpid();
                        if pid as i32 == current_pid.as_raw() {
                            return;
                        }

                        // Kill the entire process group (PGID = PID due to setpgid)
                        let _ = nix::sys::signal::kill(
                            Pid::from_raw(-(pid as i32)),
                            nix::sys::signal::Signal::SIGKILL,
                        );
                    }

                    // Also kill any processes on the port (in case of lingering processes)
                    if let Some(port) = port
                        && let Ok(output) = Command::new("lsof")
                            .args(["-ti", &format!("tcp:{}", port)])
                            .output()
                            .await
                    {
                        let pids_str = String::from_utf8_lossy(&output.stdout);
                        let current_pid = nix::unistd::getpid().as_raw();
                        for pid_str in pids_str.trim().split('\n') {
                            if let Ok(pid) = pid_str.trim().parse::<i32>()
                                && pid != current_pid
                            {
                                let _ = nix::sys::signal::kill(
                                    Pid::from_raw(pid),
                                    nix::sys::signal::Signal::SIGKILL,
                                );
                            }
                        }
                    }
                });
            }
        }

        #[cfg(not(unix))]
        {
            // On Windows, kill using Child handles
            for (name, mut child, port) in processes_to_kill {
                kill_futures.push(async move {
                    let _ = child.kill();
                    // On Windows, also try port-based killing as fallback
                    if let Some(port) = port {
                        // Port killing on Windows would go here if needed
                        // For now, child.kill() should be sufficient
                    }
                });
            }
        }

        // Execute all kills in parallel with a short timeout
        tokio::time::timeout(
            tokio::time::Duration::from_millis(500),
            future::join_all(kill_futures),
        )
        .await
        .ok();

        self.log("system", "All services stopped.".to_string())
            .await;
    }

    pub async fn toggle_watch(&self, name: &str) {
        let mut watching = self.watching.write().await;
        let current = *watching.get(name).unwrap_or(&false);
        watching.insert(name.to_string(), !current);
        drop(watching);

        // Reset status from Killed to Stopped so start_service will run
        {
            let mut status = self.status.write().await;
            if matches!(status.get(name), Some(ServiceStatus::Killed)) {
                status.insert(name.to_string(), ServiceStatus::Stopped);
            }
        }

        // Restart the service
        self.start_service(name).await;
    }

    pub async fn kill_service_toggle(&self, name: &str) {
        let current = {
            let status = self.status.read().await;
            status.get(name).cloned().unwrap_or(ServiceStatus::Stopped)
        };

        if matches!(current, ServiceStatus::Killed) {
            // Restart the service - set status to Stopped first so start_service doesn't reject it
            {
                let mut status = self.status.write().await;
                status.insert(name.to_string(), ServiceStatus::Stopped);
            }
            self.notify_status_change();
            self.start_service(name).await;
        } else {
            // Kill the service
            self.kill_service(name).await;
            let mut status = self.status.write().await;
            status.insert(name.to_string(), ServiceStatus::Killed);
            drop(status);
            self.notify_status_change();
        }
    }

    pub async fn set_watch(&self, name: &str, watching: bool) {
        // Check if the watch state is actually changing
        let current_watching = {
            let watch_map = self.watching.read().await;
            *watch_map.get(name).unwrap_or(&false)
        };

        // Only restart if the state actually changed
        if current_watching == watching {
            // State hasn't changed, just update it
            let mut watch_map = self.watching.write().await;
            watch_map.insert(name.to_string(), watching);
            return;
        }

        // State changed, update it
        let mut watch_map = self.watching.write().await;
        watch_map.insert(name.to_string(), watching);
        drop(watch_map);

        // Reset status from Killed to Stopped so start_service will run
        {
            let mut status = self.status.write().await;
            if matches!(status.get(name), Some(ServiceStatus::Killed)) {
                status.insert(name.to_string(), ServiceStatus::Stopped);
            }
        }

        // Restart the service with the appropriate command (watcher or non-watcher)
        self.start_service(name).await;
    }

    async fn log(&self, service: &str, message: String) {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        let _ = self.log_sender.send((
            service.to_string(),
            message,
            timestamp,
            "stdout".to_string(),
        ));
    }

    /// Notify subscribers that status has changed
    fn notify_status_change(&self) {
        // Ignore send errors - it's ok if no one is listening
        let _ = self.status_change_sender.send(());
    }
}
